eb693bdf8ffe69b548efe91cb9e8a027898dfb19,google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java,SubscriberImplTest,testStreamAckDeadlineUpdate,#,308
Before Change
getTestSubscriberBuilder(testReceiver)
.setAckExpirationPadding(Duration.standardSeconds(1))
.build();
subscriber.startAsync().awaitRunning();
fakeSubscriberServiceImpl.waitForStreamAckDeadline(10);
After Change
@Test
public void testStreamAckDeadlineUpdate() throws Exception {
if (!isStreamingTest) {
// This test is not applicable to polling.
return;
}
Subscriber subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setAckExpirationPadding(Duration.standardSeconds(1)));
fakeSubscriberServiceImpl.waitForStreamAckDeadline(10);
// Send messages to be acked
testReceiver.setExplicitAck(true);
sendMessages(ImmutableList.of("A"));
// Make the ack latency of the receiver equals 20 seconds
fakeExecutor.advanceTime(Duration.standardSeconds(20));
testReceiver.replyNextOutstandingMessage();
// Wait for an ack deadline update
fakeExecutor.advanceTime(Duration.standardSeconds(60));
fakeSubscriberServiceImpl.waitForStreamAckDeadline(20);
// Send more messages to be acked
testReceiver.setExplicitAck(true);
for (int i = 0; i < 999; i++) {
sendMessages(ImmutableList.of(Integer.toString(i)));
}
// Reduce the 99th% ack latency of the receiver to 10 seconds
fakeExecutor.advanceTime(Duration.standardSeconds(10));
for (int i = 0; i < 999; i++) {
testReceiver.replyNextOutstandingMessage();
}
// Wait for an ack deadline update
fakeExecutor.advanceTime(Duration.standardSeconds(60));
fakeSubscriberServiceImpl.waitForStreamAckDeadline(10);
subscriber.stopAsync().awaitTerminated();
}
@Test